package com.changgou.canal.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.canal.config.RabbitMQConfig;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

@CanalEventListener  //声明当前的类是canal的监听类
public class BaseListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
    * EntryType:当前操作的数据库类型
    *
    * RowData:当前操作数据库的数据
    * */
    @ListenPoint(schema = "changgou_business",table = "tb_ad") //schema:指当前操作的哪个数据库,table:指当前操作的哪张表
    public void adUpdate(CanalEntry.EntryType entryType,CanalEntry.RowData rowData){
        System.out.println("广告表数据发生改变");

        //获取改变之前的数据
       /* rowData.getBeforeColumnsList().forEach((c)->
                System.out.println("改变前的数据:"+c.getName()+":"+c.getValue()));*/

        //获取改变之后的数据
        /*rowData.getAfterColumnsList().forEach((c)->
                System.out.println("改变前的数据:"+c.getName()+":"+c.getValue()));*/

        //获取改变之后的数据中的position
        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
               if("position".equals(column.getName())){
                   System.out.println("发送最新的消息到MQ:"+column.getValue());

                   //发送消息
                   rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column.getValue());
            }
        }
    }
}
